/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (myaniu@gmail.com).
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.reflections.Reflections;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

import com.jfinal.kit.StrKit;
import com.jfinal.log.Log;
import com.jfinal.plugin.IPlugin;

import io.zbus.mq.Broker;
import io.zbus.mq.Consumer;
import io.zbus.mq.ConsumerConfig;
import io.zbus.mq.Producer;
import io.zbus.mq.ProducerConfig;
import io.zbus.mq.server.MqServer;
import io.zbus.mq.server.MqServerConfig;

/**
 * @ClassName: ZbusPlugin
 * @Description: JFinal的Zbus插件实现
 * @author 李飞
 * @date 2015年7月29日 下午12:46:32
 * @since V1.0.0
 */
public class ZbusPlugin implements IPlugin {

    /**
     * 日志
     */
    private static final Log LOG = Log.getLog("zbus");

    /**
     * Topic消费者配置Map topic -group- TMsgHandler
     */
    private final Map<String, Map<String, TMsgHandler<?>>> topicGroupTMsgHandlerMap = new HashMap<String, Map<String, TMsgHandler<?>>>();

    /**
     * 消费者列表
     */
    private final List<Consumer> consumerList = new ArrayList<Consumer>();

    /**
     * 发送器列表
     */
    private final static List<Sender<?>> senderList = new ArrayList<Sender<?>>();

    private static MqServer server = null;
    private static Broker broker = null;

    private final String trackerList;

    private static String token;

    private final String scanRootPackage;


    public ZbusPlugin() {
        this("jvm");
    }

    public ZbusPlugin(String trackerList) {
        this(trackerList, null);
    }

    public ZbusPlugin(String trackerList, String scanRootPackage) {
        this(trackerList, scanRootPackage, "W@#Rs1$T");
    }

    public ZbusPlugin(String trackerList, String scanRootPackage, String token) {
        this.trackerList = trackerList;
        this.scanRootPackage = scanRootPackage;
        ZbusPlugin.token = token;
    }

    /**
     * @Title: createProducer
     * @Description: 创建一个Producer
     * @return Producer
     * @throws InterruptedException
     * @throws IOException
     * @since V1.0.0
     */
    public static Producer createProducer(Sender<?> sender, String topic)
            throws IOException, InterruptedException {
        ProducerConfig config = new ProducerConfig();
        config.setBroker(broker);
        config.setToken(token);
        Producer producer = new Producer(config);
        producer.declareTopic(topic);
        senderList.add(sender);
        return producer;
    }


    /**
     * 注册消息处理器到默认topic
     * @param msgHandler 消息处理器
     */
    public void registerMsgHandler(TMsgHandler<?> msgHandler) {
        this.registerMsgHandler(Constants.DEF_TOPIC, msgHandler);
    }

    /**
     * 注册消息处理器到指定topic
     * @param topic 指定topic
     * @param msgHandler 消息处理器
     */
    public void registerMsgHandler(String topic, TMsgHandler<?> msgHandler) {
        // 依据mq获得 topic－TMsgHandler映射map
        Map<String, TMsgHandler<?>> tmc = this.topicGroupTMsgHandlerMap.get(topic);
        if (null == tmc) {
            tmc = new HashMap<String, TMsgHandler<?>>();
            this.topicGroupTMsgHandlerMap.put(topic, tmc);
        }
        //tag名即是groupName
        String groupName = msgHandler.getTag();
        tmc.put(groupName, msgHandler);
        LOG.info("注册消息处理器成功( topic=" + topic + ",handler=" + msgHandler.getClass().getName() + " )");
    }

    /**
     * @Title: ensureBroker
     * @Description: 确保broker可用
     * @throws Exception
     * @since V1.0.0
     */
    private void ensureBroker() throws Exception {
        if (broker == null) {
            synchronized (this) {
                if (broker == null) {
                    if ("jvm".equals(trackerList)) {
                        //创建本地进程内broker
                        MqServerConfig config = new MqServerConfig();
                        //默认存储在当前路径下
                        config.setMqPath(".zbus-db");
                        //只监听本地端口，默认最安全设定
                        config.setServerHost("127.0.0.1");
                        server = new MqServer(config);
                        server.start();
                        broker = new Broker(server);
                    } else {
                        broker = new Broker(this.trackerList);
                    }
                    LOG.info("创建broker成功");
                }
            }
        }
    }

    private void startTopicConsumer() throws IOException {
        // 创建topic消费者
        for (Entry<String, Map<String, TMsgHandler<?>>> mqConfig : this.topicGroupTMsgHandlerMap.entrySet()) {
            String topic = mqConfig.getKey();
            // groupName <－> TMsgHandler 映射map
            Map<String, TMsgHandler<?>> tmt = mqConfig.getValue();
            for (Entry<String, TMsgHandler<?>> topicConfig : tmt.entrySet()) {
                String groupName = topicConfig.getKey();
                TMsgHandler<?> m = topicConfig.getValue();
                ConsumerConfig config = new ConsumerConfig(broker);
                config.setToken(ZbusPlugin.token);
                config.setTopic(topic);
                //tag即是filter
                config.setConsumeGroup(groupName, m.getTag());
                config.setConnectionCount(1);
                config.setMessageHandler(m);
                Consumer c = new Consumer(config);
                c.start();
                consumerList.add(c);
                LOG.info("启动消息处理器成功( topic=" + topic + ", group=" + groupName + ", handler=" + m.getClass().getName() + " )");
            }
        }
    }

    @Override
    public boolean start() {
        try {
            // 确保创建
            ensureBroker();
            autoLoadByAnnotation();
            startTopicConsumer();
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public boolean stop() {
        try {
            // 关闭消费者
            for (Consumer c : consumerList) {
                c.close();
                c = null;
            }
            consumerList.clear();
            // 关闭所有发送器
            for (Sender<?> sender : senderList) {
                sender.close();
            }
            senderList.clear();

            // 关闭broker
            if (broker != null) {
                broker.close();
                broker = null;
            }
            //关闭本地服务
            if (server != null) {
                server.close();
                server = null;
            }
            return true;
        } catch (IOException e) {
            LOG.error(e.getMessage(), e);
            return false;
        }
    }

    @SuppressWarnings("rawtypes")
    private void autoLoadByAnnotation() {
        if (StrKit.isBlank(this.scanRootPackage)) {
            return;
        }
        Reflections reflections = new Reflections(new ConfigurationBuilder()
                .addUrls(ClasspathHelper.forClass(TMsgHandler.class)).forPackages(scanRootPackage.split(","))
                .setScanners(new SubTypesScanner(), new TypeAnnotationsScanner()));

        Set<Class<? extends TMsgHandler>> handlerClasses = reflections.getSubTypesOf(TMsgHandler.class);
        for (Class<? extends TMsgHandler> mc : handlerClasses) {
            TMsgHandler<?> hander = null;
            Handler handlerAnn = mc.getAnnotation(Handler.class);
            if (null == handlerAnn) {
                continue;
            }
            //初始化执行器
            try {
                hander = (TMsgHandler<?>) mc.newInstance();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                throw new RuntimeException(e.getMessage(), e);
            }
            String topic = handlerAnn.topic();
            if (StrKit.isBlank(topic)) {
                throw new RuntimeException("topic不能为空：" + mc.getName());
            }

            this.registerMsgHandler(topic, hander);

        } // end for
    }
}
